Amazon SESでDMARCの集計レポートを受信してAthenaで検索をかけてみる

Amazon SESでDMARCの集計レポートを受信してAthenaで検索をかけてみる

Clock Icon2024.02.13

初めに

DAMRCにおける集計レポートはご存知でしょうか。

DMARCではruaタグに所定のアドレスを指定することで1日に1度程度を目安にその受信サーバに届いたメールの配信状況の集約情報を受け取ることができます。

なかな個別のレポートを目で検査するのは厳しいかと思いますが、各社のDMARC分析用のサービスであったり、OSSでもparsedmarcのようなツールがあったりと現在では比較的気軽に分析することができます。

継続的な分析というよりはとりあえずの導入でSPF/DKIM対応から漏れている環境をサクッとみたい時にAWS上で何か良い感じに作れないかなぁと思って考えてみたところ、それくらいであればAmazon SESで受信したデータをS3に格納しAthenaで検索すれば使った分だけの費用で良い感じでは?
と思いついたので構築してみることにしました。

注意

https://datatracker.ietf.org/doc/html/rfc7489
The aggregate data MUST be an XML file that SHOULD be subjected to GZIP compression
...
The extension MUST be "xml" for a plain XML file, or "xml.gz" for an XML file compressed using GZIP.

DMARCの定義を見る限りgzipもしくは未圧縮のXMLで送られてくる想定ですが、実際にGMailからの集計レポートを確認してみたところZIPで送付されていました。
今回はひとまずこのzipの形式に対応する形で実装しgzip等の形式は未対応となります。

今回のコードは対応しないデータが出ても何らかエラーでますが変換されないだけなので害はそこまでないはずですが、エラーハンドリングはほとんどしていないのでご注意ください。

構成

以下のような構成となります。

[email protected]にメールが届くと届いたメールをもとにXMLを解析しJSONに変換、S3に格納します。
必要な際にユーザはAthenaを利用してクエリをかけるという形です。

https://docs.aws.amazon.com/ja_jp/ses/latest/dg/receiving-email-action-lambda.html
RequestResponse の呼び出しには 30 秒のタイムアウトがあります。
...
Amazon SES が Lambda に渡すデータには、メッセージに関するメタデータと、複数の E メールヘッダーが含まれます。ただし、メッセージの本文は含まれません。

Amazon SESから直接Lambda関数を呼び出すことも可能ですが固有の制約があったりS3に対してオブジェクトを保存する必要があるのに加え、
SAMで定義をする場合AWS::Serveless::FunctionEventsに含められる定義としてAmazon SESが指定できない関係でS3をトリガーに呼び出した方がテンプレート上の取り回しが良いためこの構成を取っています。

テンプレート・ソースコード

長くなってしまうので以下のリポジトリに格納しております。

なお受信サーバの設定については一部CloudFormationで実装できない関係で手動で設定しています。
今回のテンプレートは以前執筆した記事を元に拡張しているため受信設定周りは以下の記事もご参照ください。

DMARCレコードの設定

以下のように設定いたしました。なぜか初日にうまく届かないといったこともあって実際の検証中に一部値が変動しています。

$ dig _dmarc.example.com txt +short
"v=DMARC1; p=reject; pct=100; rua=mailto:[email protected]; fo=1;"

実際の検証の際にはIPの信頼性を下げるのであまりやりたくはないですがSPFレコードのIPを正しくないものに設定した上でメールを送信しSPFに合致しないメールとして送信しました。

実際に届いた集計レポート

GMailから送付されたS3側に格納されたメールは以下のようになっておりました。レポートはZIPとして添付されています。

Return-Path: <[email protected]>
Received: from mail-qv1-f74.google.com (mail-qv1-f74.google.com [209.85.219.74])
 by inbound-smtp.ap-northeast-1.amazonaws.com with SMTP id q57ojo9ir0irh6k0dnlhthddbuv0u252imag8eg1
 for [email protected];
 Thu, 08 Feb 2024 11:01:24 +0000 (UTC)
...
Date: Wed, 07 Feb 2024 15:59:59 -0800
Message-ID: <[email protected]>
Subject: Report domain: example.com Submitter: google.com Report-ID: xxxxxx
From: [email protected]
To: [email protected]
Content-Type: application/zip;
	name="google.com!example.com!1707264000!1707350399.zip"
Content-Disposition: attachment;
	filename="google.com!example.com!1707264000!1707350399.zip"
Content-Transfer-Encoding: base64

UEsDBAoAAAAIALdTSFjLGMPA2gEAAD8EAAAyAAAAZ29vZ2xlLmNvbSFhbGljZWZjYXQuY29tITE3
MDcyNjQwMDAhMTcwNzM1MDM5OS54bWyNVMt2pCAQ3ecrPL1v8dFqO4eQWeULMmsPjaXNiQIHMI+/
HwxgO51ZZCXeunWr6lKKnz7mKXkDbbgUj4c8zQ4JCCZ7LsbHw5+X5+...

実際にZIPから取り出した集計レポートは以下のようになっておりました。
色々試してる間にうまく届かずpタグの値を変更していたのですがこの値が異なると別々に集約されるようです。

<?xml version="1.0" encoding="UTF-8" ?>
<feedback>
  <report_metadata>
    <org_name>google.com</org_name>
    <email>[email protected]</email>
    <extra_contact_info>https://support.google.com/a/answer/2466580</extra_contact_info>
    <report_id>10734515597477549118</report_id>
    <date_range>
      <begin>1707264000</begin>
      <end>1707350399</end>
    </date_range>
  </report_metadata>
  <policy_published>
    <domain>example.com</domain>
    <adkim>r</adkim>
    <aspf>r</aspf>
    <p>reject</p>
    <sp>reject</sp>
    <pct>100</pct>
    <np>reject</np>
  </policy_published>
  <record>
    <row>
      <source_ip>xxx.xxx.xxx.xxx</source_ip>
      <count>5</count>
      <policy_evaluated>
        <disposition>reject</disposition>
        <dkim>fail</dkim>
        <spf>fail</spf>
      </policy_evaluated>
    </row>
    <identifiers>
      <header_from>example.com</header_from>
    </identifiers>
    <auth_results>
      <spf>
        <domain>example.com</domain>
        <result>softfail</result>
      </spf>
    </auth_results>
  </record>
  <record>
    <row>
      <source_ip>xxx.xxx.xxx.xxx</source_ip>
      <count>1</count>
      <policy_evaluated>
        <disposition>none</disposition>
        <dkim>fail</dkim>
        <spf>pass</spf>
      </policy_evaluated>
    </row>
    <identifiers>
      <header_from>example.com</header_from>
    </identifiers>
    <auth_results>
      <spf>
        <domain>example.com</domain>
        <result>pass</result>
      </spf>
    </auth_results>
  </record>
  <record>
    <row>
      <source_ip>yyy.yyy.yyy.yyy</source_ip>
      <count>1</count>
      <policy_evaluated>
        <disposition>none</disposition>
        <dkim>pass</dkim>
        <spf>pass</spf>
      </policy_evaluated>
    </row>
    <identifiers>
      <header_from>example.com</header_from>
    </identifiers>
    <auth_results>
      <dkim>
        <domain>example.com</domain>
        <result>pass</result>
        <selector>eogp72tvvmnugysm4vjttzghjnmzww36</selector>
      </dkim>
      <dkim>
        <domain>amazonses.com</domain>
        <result>pass</result>
        <selector>zh4gjftm6etwoq6afzugpky45synznly</selector>
      </dkim>
      <spf>
        <domain>ses.example.com</domain>
        <result>pass</result>
      </spf>
    </auth_results>
  </record>
</feedback>

Lambdaで変換した後のデータは以下の通りです。
Athenaで取り扱う関係で実際に一行になっていることに加え、展開すると長くなる関係であえてそのまま載せています。必要なかたは手元で広げてみてください

よく見かけるサンプルのレポートはrecordが単一になっていることが多く勘違いしやすいですがrecordは配列です。自前で変換処理書いてみたい方でrecordが単一のファイルを取り扱っている場合はパーサによっては配列にならないためご注意ください。

{"feedback":{"report_metadata":{"org_name":"google.com","email":"[email protected]","extra_contact_info":"https://support.google.com/a/answer/2466580","report_id":"10734515597477549118","date_range":{"begin":"1707264000","end":"1707350399"}},"policy_published":{"domain":"example.com","adkim":"r","aspf":"r","p":"reject","sp":"reject","pct":"100","np":"reject"},"record":[{"row":{"source_ip":"xxx.xxx.xxx.xxx","count":"5","policy_evaluated":{"disposition":"reject","dkim":"fail","spf":"fail"}},"identifiers":{"header_from":"example.com"},"auth_results":{"spf":{"domain":"example.com","result":"softfail"}}},{"row":{"source_ip":"xxx.xxx.xxx.xxx","count":"1","policy_evaluated":{"disposition":"none","dkim":"fail","spf":"pass"}},"identifiers":{"header_from":"example.com"},"auth_results":{"spf":{"domain":"example.com","result":"pass"}}},{"row":{"source_ip":"yyy.yyy.yyy.yyy","count":"1","policy_evaluated":{"disposition":"none","dkim":"pass","spf":"pass"}},"identifiers":{"header_from":"example.com"},"auth_results":{"dkim":[{"domain":"example.com","result":"pass","selector":"eogp72tvvmnugysm4vjttzghjnmzww36"},{"domain":"amazonses.com","result":"pass","selector":"zh4gjftm6etwoq6afzugpky45synznly"}],"spf":{"domain":"ses.example.com","result":"pass"}}}]}}

Athenaの操作

テーブルの作成までCloudFormationコード上に含めようかと思いましたがGlueのテーブルを使う場合同定義でうまく動かなかったのでAthenaのマネジメントコンソール上でクエリを流してテーブルを作成する形を取りました。 実際にに届いたレコードを元にテーブルを作成しているの指定レコードの状態等で不足するパラメータが存在する可能性がある点ご注意ください。

CREATE EXTERNAL TABLE IF NOT EXISTS dmarc_report (
  feedback struct<
    report_metadata: struct<
      org_name: string,
      email: string,
      extra_contact_info: string,
      report_id: string,
      date_range: struct<
        begin: string,
        `end`: string
      >
    >,
    policy_published: struct<
      domain: string,
      adkim: string,
      aspf: string,
      p: string,
      sp: string,
      pct: string,
      np: string
    >,
    record: array<struct<
      `row`: struct<
        source_ip: string,
        `count`: string,
        policy_evaluated: struct<
          disposition: string,
          dkim: string,
          spf: string
        >
      >,
      identifiers: struct<
        header_from: string
      >,
      auth_results: struct<
        spf: struct<
          domain: string,
          result: string
        >,
        dkim: array<
          struct<
            domain: string,
            result: string,
            selector: string
          >
        >
      >>
    >
  >
 )
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH SERDEPROPERTIES ('paths'='feedback')
LOCATION 's3://rua.example.com-mail-stocker/output/catcher';

テーブルができたのでレポートに記載されておりSPF/DKIMいずれもpassではないレコードを抽出するクエリを流します。 総数はrecord.row.countにIP毎の送付数が記載されているためこちらの総和を取ります。

WITH records AS (
    SELECT feedback.record as record
    FROM dmarc_report
)
SELECT
    rec.row.source_ip,
    SUM(CAST(rec.row.count AS Integer)) AS cnt
FROM records, UNNEST(record) AS t(rec)
WHERE rec.row.policy_evaluated.spf != 'pass' AND rec.row.policy_evaluated.dkim != 'pass'
GROUP BY rec.row.source_ip
ORDER BY cnt desc;

マスクしてしまいわからないですがいずれもfailになっている最初のxxx.xxx.xxx.xxxだけが抽出されました。

条件(WHERE)をなくすと今回のレコードが7件全て載っていることが確認できます。

終わりに

DMARCの集計レポートを受信してAthenaで分析してみました。

日常的に送付を行なっているメールサーバであればある程度のメール送付件数はあるかと思いますので、IP毎の送信数を降順にソートして見覚えのあるIPがないか?という形で確認すればある程度対応漏れしている送信元の目星はつけられるかと考えております。

o high-volume denial-of-service attacks;
o deliberate construction of malformed reports intended to identify or exploit parsing or processing vulnerabilities;
o deliberate construction of reports containing false claims for the Submitter or Reported-Domain fields, including the possibility of false data from compromised but known Mail Receivers.

集計レポートを利用する際の注意点として一般技術として特定の用途に結び付けられる形でメールアドレスを公開するため、例としてRFC 7489のセクション12.2にも上記のような記載があるように偽のメールや負荷をかけるような攻撃を受ける可能性があります。
前者について独立しているため機能的な影響としては受けづらいかと思いますが利用課金を目的としたEDoS攻撃となる可能性もあります。

今回の方法ではフィルタリング等を準備しようと思うと自前でそれなりに手を入れる必要があり苦しくなってくるかと思いますので、一時的にちょっとみる程であるならともかく継続的に利用するのであればやはり各種分析サービスをご利用いただくのが無難な選択肢になるかと考えておりますのでお試しの際にはご注意ください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.